8.1 并发的含义
在开始本章之前,需要了解并发(concurrency)和并行(parallesim)的区别。
- 并发:逻辑上具备同时处理多个任务的能力。
- 并行:物理上在同一时刻执行多个并发任务。
我们通常会说程序是并发设计的,也就是说它允许多个任务同时执行,但实际上并不一定真在同一时刻发生。在单核处理器上,它们能以间隔方式切换执行。而并行则依赖多核处理器等物理设备,让多个任务真正在同一时刻执行,它代表了当前程序运行状态。简单点说,并行是并发设计的理想执行模式。
Concurrency is not parallelism:Different concurrent designs enable different ways to parallelize.
多线程或多进程是并行的基本条件,但单线程也可用协程(coroutine)做到并发。尽管协程在单个线程上通过主动切换来实现多任务并发,但它也有自己的优势。除了将因阻塞而浪费的时间找回来外,还免去了线程切换开销,有着不错的执行效率。协程上运行的多个任务本质上是依旧串行的,加上可控自主调度,所以并不需要做同步处理。
即便采用多线程也未必就能并行。Python就因GIL限制,默认只能并发而不能并行,所以很多时候转而使用“多进程+协程”架构。
很难说哪种方式更好一些,它们有各自适用的场景。通常情况下,用多进程来实现分布式和负载平衡,减轻单进程垃圾回收压力;用多线程(LWP)抢夺更多的处理器资源;用协程来提高处理器时间片利用率。
简单将goroutine归纳为协程并不合适。运行时会创建多个线程来执行并发任务,且任务单元可被调度到其他线程并行执行。这更像是多线程和协程的综合体,能最大限度提升执行效率,发挥多核处理能力。
更多实现细节,请阅读本书下卷《源码剖析》。
只须在函数调用前添加go关键字即可创建并发任务。
go println(“hello,world!”)
go func(s string) { println(s) }(“hello,world!“)
注意是函数调用,所以必须提供相应的参数。
关键字go并非执行并发操作,而是创建一个并发任务单元。新建任务被放置在系统队列中,等待调度器安排合适系统线程去获取执行权。当前流程不会阻塞,不会等待该任务启动,且运行时也不保证并发任务的执行次序。
每个任务单元除保存函数指针、调用参数外,还会分配执行所需的栈内存空间。相比系统默认MB级别的线程栈,goroutine自定义栈初始仅须2 KB,所以才能创建成千上万的并发任务。自定义栈采取按需分配策略,在需要时进行扩容,最大能到GB规模。
在不同版本中,自定义栈大小略有不同。如未做说明,本书特指1.6 amd64。
与defer一样,goroutine也会因“延迟执行”而立即计算并复制执行参数。
var c int
func counter()int{ c++ return c }
func main() { a:=100
go func(x,y int) { time.Sleep(time.Second) // 让goroutine在main逻辑之后执行 println(“go:“,x,y) }(a,counter()) // 立即计算并复制参数
a+=100 println(“main:“,a,counter())
time.Sleep(time.Second*3) // 等待goroutine结束 }
输出:
main:200 2 go:100 1
Wait
进程退出时不会等待并发任务结束,可用通道(channel)阻塞,然后发出退出信号。
func main() { exit:=make(chan struct{}) // 创建通道。因为仅是通知,数据并没有实际意义
go func() { time.Sleep(time.Second) println(“goroutine done.”)
close(exit) // 关闭通道,发出信号
}()
println(“main…”) ←exit // 如通道关闭,立即解除阻塞 println(“main exit.“) }
输出:
main… goroutine done. main exit.
除关闭通道外,写入数据也可解除阻塞。channel的更多信息,后面再做详述。
如要等待多个任务结束,推荐使用sync.WaitGroup。通过设定计数器,让每个goroutine在退出前递减,直至归零时解除阻塞。
import( “sync” “time” )
func main() { var wg sync.WaitGroup
for i:=0;i<10;i++ { wg.Add(1) // 累加计数
go func(id int) {
defer wg.Done() // 递减计数
time.Sleep(time.Second)
println("goroutine",id, "done.")
}(i)
}
println(“main…”) wg.Wait() // 阻塞,直到计数归零 println(“main exit.“) }
输出:
main… goroutine 9 done. goroutine 4 done. goroutine 2 done. goroutine 6 done. goroutine 8 done. goroutine 3 done. goroutine 5 done. goroutine 1 done. goroutine 0 done. goroutine 7 done. main exit.
尽管WaitGroup.Add实现了原子操作,但建议在goroutine外累加计数器,以免Add尚未执行,Wait已经退出。
func main() { var wg sync.WaitGroup
go func() { wg.Add(1) // 来不及设置 println(“hi!“) }()
wg.Wait() println(“exit.“) }
可在多处使用Wait阻塞,它们都能接收到通知。
func main() { var wg sync.WaitGroup wg.Add(1)
go func() { wg.Wait() // 等待归零,解除阻塞 println(“wait exit.“) }()
go func() { time.Sleep(time.Second) println(“done.”) wg.Done() // 递减计数 }()
wg.Wait() // 等待归零,解除阻塞 println(“main exit.“) }
输出:
done. wait exit. main exit.
GOMAXPROCS
运行时可能会创建很多线程,但任何时候仅有限的几个线程参与并发任务执行。该数量默认与处理器核数相等,可用runtime.GOMAXPROCS函数(或环境变量)修改。
如参数小于1,GOMAXPROCS仅返回当前设置值,不做任何调整。
import( “math” “runtime” “sync” )
// 测试目标函数 func count() { x:=0 for i:=0;i<math.MaxUint32;i++ { x+=i }
println(x) }
// 循环执行 func test(n int) { for i:=0;i<n;i++ { count() } }
// 并发执行 func test2(n int) { var wg sync.WaitGroup wg.Add(n)
for i:=0;i<n;i++ { go func() { count() wg.Done() }() }
wg.Wait() }
func main() { n:=runtime.GOMAXPROCS(0) test(n) //test2(n) }
输出:
$time./test
9223372030412324865 9223372030412324865 9223372030412324865 9223372030412324865
real 0m8.395s user 0m8.281s sys 0m0.056s
$time./test2
9223372030412324865 9223372030412324865 9223372030412324865 9223372030412324865
real 0m3.907s // 程序实际执行时间 user 0m14.438s // 多核执行时间累加 sys 0m0.041s
该测试机器是4核,可用runtime.NumCPU函数返回。
Local Storage
与线程不同,goroutine任务无法设置优先级,无法获取编号,没有局部存储(TLS),甚至连返回值都会被抛弃。但除优先级外,其他功能都很容易实现。
func main() { var wg sync.WaitGroup var gs[5]struct{ // 用于实现类似TLS功能 id int // 编号 result int // 返回值 }
for i:=0;i<len(gs);i++ { wg.Add(1)
go func(id int) { // 使用参数避免闭包延迟求值
defer wg.Done()
gs[id].id=id
gs[id].result= (id+1) *100
}(i)
}
wg.Wait() fmt.Printf(”%+v\n”,gs) }
输出:
{id:0 result:100} {id:1 result:200} {id:2 result:300} {id:3 result:400} {id:4 result:500}
如使用map作为局部存储容器,建议做同步处理,因为运行时会对其做并发读写检查。
Gosched
暂停,释放线程去执行其他任务。当前任务被放回队列,等待下次调度时恢复执行。
func main() { runtime.GOMAXPROCS(1) exit:=make(chan struct{})
go func() { // 任务a defer close(exit)
go func() { // 任务b。放在此处,是为了确保a优先执行
println("b")
}()
for i:=0;i<4;i++ {
println("a:",i)
if i==1{ // 让出当前线程,调度执行b
runtime.Gosched()
}
}
}()
<-exit
}
输出:
a:0 a:1 b a:2 a:3
该函数很少被使用,因为运行时会主动向长时间运行(10 ms)的任务发出抢占调度。只是当前版本实现的算法稍显粗糙,不能保证调度总能成功,所以主动切换还有适用场合。
Goexit
Goexit立即终止当前任务,运行时确保所有已注册延迟调用被执行。该函数不会影响其他并发任务,不会引发panic,自然也就无法捕获。
func main() { exit:=make(chan struct{})
go func() { defer close(exit) // 执行 defer println(“a”) // 执行
func() {
defer func() {
println("b",recover() ==nil) // 执行,recover返回nil
}()
func() { // 在多层调用中执行Goexit
println("c")
runtime.Goexit() // 立即终止整个调用堆栈
println("c done.") // 不会执行
}()
println("b done.") // 不会执行
}()
println("a done.") // 不会执行
}()
<-exit
println(“main exit.“) }
输出:
c b true a main exit.
如果在main.main里调用Goexit,它会等待其他任务结束,然后让进程直接崩溃。
func main() {
for i:=0;i<2;i++ {
go func(x int) {
for n:=0;n<2;n++ {
fmt.Printf(“%c: %d\n”, ‘a’+x,n)
time.Sleep(time.Millisecond)
}
}(i)
}
runtime.Goexit() // 等待所有任务结束 println(“main exit.“) }
输出:
b:0 a:0 b:1 a:1 fatal error:no goroutines(main called runtime.Goexit) -deadlock!
无论身处哪一层,Goexit都能立即终止整个调用堆栈,这与return仅退出当前函数不同。
标准库函数os.Exit可终止进程,但不会执行延迟调用。